package raft

import (
	"encoding/json"
	"github.com/gogo/protobuf/sortkeys"

	// "encoding/json"
	"fmt"
	"github.com/pingcap-incubator/tinykv/log"
	pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
	// "github.com/gogo/protobuf/sortkeys"
	// "github.com/pingcap-incubator/tinykv/log"
	// pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
)

var ErrNeedLeader = fmt.Errorf("need leader can do")

func (r *Raft) onHeartbeatsFailed(to uint64, resp *RspHeartbeat, doAgain func(to uint64)) {
	//check term.
	if r.Term < resp.Term {
		r.becomeFollower(resp.Term, 0)
		return
	}
	//reset mach;
	pr := r.Prs[to]
	if pr.Match == 0 {
		log.Fatalf("%s peer(%d)(term=%d)mach is zero.logic error.%+v;%s", r.tag, to, r.Term, resp, r.prs2string())
		return
	}
	//NOTICE-raft:如果不是本轮次到，直接忽略.
	//if pr.Match != resp.reqPrevLogIndex {
	//	log.Warnf("%s pr(%d):not this round message=%+v", r.tag, pr.Match, resp)
	//	return
	//}
	pr.Match--
	pr.Next = pr.Match + 1
	doAgain(to)
}
func (r *Raft) processHeartBeatRequest(req *ReqHeartbeat, resp *RspHeartbeat, from uint64) {
	curTerm := r.Term
	resp.Term = curTerm
	resp.Success = false
	//1.如果term < currentTerm返回 false （
	if req.Term < curTerm {
		log.Warnf("%s req.Term(%d)idx(%d)from(%d) < curTerm(%d)", r.tag, req.Term, req.PrevLogIndex, from, curTerm)
		return
	}
	// 如果接收到的 RPC 请求或响应中，任期号T > currentTerm，那么就令 currentTerm 等于 T，并切换状态为跟随者
	if req.Term > curTerm {
		//不管是否是follower，都需要更新term，所以全部都走这里。
		r.becomeFollower(req.Term, req.LeaderId)
	}
	//req.Term == curTerm
	if r.State == StateCandidate {
		//已经有leader，不必在等机会成为leader
		r.becomeFollower(req.Term, req.LeaderId)
	}
	if r.State == StateLeader {
		//同一term不可能有两个leader 存在两个leader了 直接报错
		log.Fatalf("%s logic error:state is leader.from=%d", r.tag, from)
		return
	}
	r.Lead = req.LeaderId
	//如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配，则返回 false （5.3 节）
	//如果是带过来prev是空（0），那么表示ok。
	if req.PrevLogIndex == 0 {
		resp.Success = true
		return
	}
	//2.1 如果有日志，那么要比较日志.
	localTerm, err := r.RaftLog.Term(req.PrevLogIndex)
	if err != nil {
		return //false
	}
	if localTerm != req.PrevLogTerm {
		log.Warnf("%s index(%d)localTerm(%d) != req.PrevLogTerm(%d)", r.tag, req.PrevLogIndex, localTerm, req.PrevLogTerm)
		return
	}
	resp.Success = true
}

//make heart beat
func (r *Raft) makeHeartbeat(pr *Progress, hb *ReqHeartbeat) error {
	//如果不是leader 就不用发送心跳消息
	if r.State != StateLeader {
		return ErrNeedLeader
	}
	hb.Term = r.Term
	//领导人的 Id，以便于跟随者重定向请求
	hb.LeaderId = r.id
	//领导人已经提交的日志的索引值
	hb.LeaderCommitId = r.RaftLog.committed
	//新的日志条目紧随之前的索引值
	//         	对于每一个服务器，已经复制给他的日志的最高索引值
	hb.PrevLogIndex = pr.Match
	//求任期号
	t, err := r.RaftLog.Term(pr.Match)
	if err != nil {
		return err
	}
	//prevLogIndex 条目的任期号
	hb.PrevLogTerm = t
	return nil
}

// handleAppendEntries handle AppendEntries RPC request
func (r *Raft) onAppendEntries(m pb.Message) {
	if r.State != StateLeader {
		//可能落后的消息,直接drop.
		return
	}
	// Your Code Here (2A).
	var resp RspAppend
	resp.fromPbMsg(m)
	log.Debugf("onAppendEntries '%d->%d'(%v):%+v", m.GetFrom(), m.GetTo(), m.GetMsgType(), resp)
	//失败处理。
	if false == resp.Success {
		r.onHeartbeatsFailed(m.GetFrom(), &resp.RspHeartbeat, func(to uint64) {
			r.sendAppend(to)
		})
		return
	}
	//成功处理.
	//1-update progress;
	pr := r.Prs[m.GetFrom()]
	pr.Match = resp.LastLogIndex
	pr.Next = pr.Match + 1
	if pr.Match < r.RaftLog.LastIndex() {
		//还有没发的日志，再次发送.
		r.sendAppend(m.GetFrom())
	}
	//leadership transfer state going.
	if pr.Match == r.RaftLog.LastIndex() && r.leadTransferee == m.GetFrom() {
		r.sendTimeoutNow(m.GetFrom())
	}
	//2-update raftLog commit;
	r.updatePrCommits()
	return
}

// handleHeartbeat handle Heartbeat RPC request
func (r *Raft) onHeartbeat(m pb.Message) {
	if r.State != StateLeader {
		//可能落后的消息,直接drop.
		return
	}
	// Your Code Here (2A).
	var resp RspHeartbeat
	resp.fromPbMsg(m)
	log.Debugf("onHeartbeat '%d->%d':%+v", m.GetFrom(), m.GetTo(), resp)
	//失败处理.
	if false == resp.Success {
		r.onHeartbeatsFailed(m.GetFrom(), &resp, func(to uint64) {
			r.sendHeartbeat(to)
		})
		return
	}
	//成功处理.
}

func (r *Raft) updatePrCommits() {
	if r.State != StateLeader {
		return
	}
	//	如果存在一个满足N > commitIndex的 N，并且大多数的matchIndex[i] ≥ N成立，并且log[N].term == currentTerm成立，那么令 commitIndex 等于这个 N （5.3 和 5.4 节）
	var counters = map[uint64]int{}
	//counters[r.RaftLog.LastIndex()] = 1
	for _, pr := range r.Prs {
		v := counters[pr.Match]
		counters[pr.Match] = v + 1
	}
	sortLogIndex := make([]uint64, 0, len(counters))
	for logidx, _ := range counters {
		if logidx > r.RaftLog.committed {
			sortLogIndex = append(sortLogIndex, logidx)
		}
	}
	sortkeys.Uint64s(sortLogIndex)
	prvCnt := 0
	last := len(sortLogIndex) - 1
	total := r.peerCount()
	for ; last >= 0; last-- {
		logidx := sortLogIndex[last]
		if logidx < r.RaftLog.committed {
			//可能会存在落后很多多节点，所以，需要判断下.
			break
		}
		cnt := counters[logidx] + prvCnt
		if IsMajor(cnt, total) {
			//并且log[N].term == currentTerm成立(就是说，不是自己的，不commit)
			term, err := r.RaftLog.Term(logidx)
			if err != nil {
				log.Warnf("%s commit err:term(%d) err:%s", r.tag, logidx, err.Error())
				d, _ := json.Marshal(r.Prs)
				log.Fatalf("%s leader(%d) total=%d;cnt=%d;prevCnt=%d;prs=%+v;logIdxs=%+v;counters=%+v", r.tag, r.Lead, total, cnt, prvCnt, string(d), sortLogIndex, counters)
			} else {
				if term == r.Term {
					if r.RaftLog.committed < logidx {
						r.RaftLog.committed = logidx
						//update the commit id to follower;
						log.Debugf("%s update commit %d", r.tag, r.RaftLog.committed)
						r.broadcastAppend()
					}
				}
			}
			return
		} else {
			prvCnt = cnt
		}
	}
}

func (r *Raft) processEntries(entries []*pb.Entry) bool {
	elen := len(entries)
	if elen == 0 { //no logs;
		return true
	}
	rlog := r.RaftLog
	startIdx := entries[0].Index
	//直接append
	if startIdx > rlog.LastIndex() {
		for _, e := range entries {
			rlog.entries = append(rlog.entries, *e)
		}
		return true
	}
	//有重叠日志，要检查重叠区域是否有冲突;(所有日志带index是连续带.)
	start, err := rlog.pos(entries[0].Index)
	if err != nil {
		log.Errorf("index(%d) error:%s", entries[0].Index, err)
		return false
	}
	//find the conflict pos;
	logLen := len(rlog.entries)
	inpos := 0
	for ; start < uint64(logLen) && inpos < elen; start++ {
		//check conflict;
		if rlog.entries[start].Term != entries[inpos].Term {
			//delete the conflict position logs;
			rlog.entries = rlog.entries[:start]
			if len(rlog.entries) > 0 {
				//日志有冲突，那么就需要重新saveLog
				last := len(rlog.entries) - 1
				ent := &rlog.entries[last]
				if rlog.stabled > ent.Index {
					rlog.stabled = ent.Index
				}
			} else {
				rlog.stabled = entries[inpos].Index - 1
			}
			break //(start,inpos) is conflict position;
		} else {
			inpos++
		}
	}

	//debugf("do append at inpos(%d)",inpos)
	for ; inpos < elen; inpos++ {
		rlog.entries = append(rlog.entries, *entries[inpos])
	}
	return true
}
